knitr::opts_chunk$set(echo = TRUE, message = FALSE, warning = FALSE, fig.height = 4, fig.width = 7)
library(terra)
library(oharac)
library(data.table)
library(tidyverse)
library(here)
source(here('common_fxns.R'))Apply vulnerability scores to mapped species. Map out vulnerability to each stressor, first unweighted by species (each spp counts same, this script), then weighted by functional vulnerability (FEs with higher FV are given greater weight, next script). This script only includes species categorized into a functional entity.
This includes information on species vulnerability scores, functional entity ID and traits, as well as which species are mapped in which sources (including rangemap filenames).
spp_info_df <- assemble_spp_info_df(fe_only = TRUE) %>%
rename(vulnerability = stressor)mc_process_vuln <- function(spp_maps_df, spp_str_info_df) {
### Break into chunks for parallel processing
chunk_size <- 100000
n_chunks <- ceiling(6.5e6/chunk_size)
### while there are 6.56e6 cells in the raster, the highest ocean cell is 6475924
n_cores <- max(1, floor(32 / ceiling(nrow(spp_maps_df)/5e7)))
message('Using ', n_cores, ' cores for ', nrow(spp_maps_df), ' observations...')
# n_cores <- ceiling(32 / ceiling(9.7e8/4e7))
vuln <- spp_str_info_df$vulnerability %>% unique()
taxon <- spp_str_info_df$taxon %>% unique()
spp_vuln_df <- spp_str_info_df %>%
select(species, v_score) %>%
distinct()
# system.time({
result_list <- parallel::mclapply(1:n_chunks, mc.cores = n_cores,
FUN = function(n) { ### n <- 6
cell_id_min <- as.integer((n - 1) * chunk_size + 1)
cell_id_max <- as.integer(n * chunk_size)
message('Summarizing vuln to ', vuln, ' on ',
taxon, ': chunk ', n, ' of ', n_chunks, '...')
### filter to chunk cells and join vulnerability scores
chunk_spp_cells <- spp_maps_df %>%
filter(between(cell_id, cell_id_min, cell_id_max)) %>%
oharac::dt_join(spp_vuln_df, by = 'species', type = 'left')
### summarize unweighted mean/sdev
chunk_sum_unwt <- chunk_spp_cells %>%
data.table() %>%
.[, .(vuln_mean_unwt = mean(v_score),
vuln_sd_unwt = sd(v_score),
n_spp = length(unique(species))),
by = .(cell_id)]
return(chunk_sum_unwt)
})
if(check_tryerror(result_list)) {
stop('Try error results in mc_process_vuln for vuln ', vuln, '...')
}
message('Binding cell results for vuln ', vuln, '...')
result_df <- result_list %>%
data.table::rbindlist() %>%
filter(!is.na(cell_id))
}taxa_vec <- spp_info_df$taxon %>% unique() %>% sort()
vuln_vec <- spp_info_df$vulnerability %>% unique()
spp_info_df %>%
select(species, taxon) %>%
distinct() %>% pull(taxon) %>% table()## .
## cephalopods corals crustacea_arthropods
## 176 998 3193
## echinoderms elasmobranchs fish
## 967 1120 10305
## marine_mammals molluscs polychaetes
## 121 2891 568
## reptiles seabirds sponges
## 81 311 413
### Loop over taxa levels
for(t in taxa_vec) {
### t <- taxa_vec[6]
tx_spp <- spp_info_df %>%
filter(taxon == t)
tx_map_df <- tx_spp %>%
select(species, map_f) %>%
distinct()
message('Processing impacts for ', t, '...')
### if it exists, spp_maps_df is from a different taxon...
### remove it here; and if any vulns need to be processed, load
### it inside the vuln loop...
rm('spp_maps_df')
### Loop over vulns
for(v in vuln_vec) { ### v <- vuln_vec[15]
prms <- c('mean', 'sdev', 'nspp')
outfile_stem <- here_anx('vuln_maps_by_taxon/vuln_by_species/vuln_%s_%s_spp_%s.tif')
### vuln, taxon, parameter
outfiles <- sprintf(outfile_stem, v, t, prms) %>%
setNames(prms)
### unlink(outfiles)
if(all(file.exists(outfiles[c('mean', 'sdev')]))) {
### nspp rasts are collected into a single one later in the script;
### don't recreate those if the other two exist!
message('All files exist for vulnerability ', v, ' on ', t, '...')
next()
}
message('Processing vulnerability ', v, ' on ', t, '...')
### If not yet loaded for this taxon, load spp maps
if(!exists('spp_maps_df')) {
message('Loading species rangemaps for ', t, '...')
spp_maps_df <- collect_spp_rangemaps(spp_vec = tx_map_df$species,
file_vec = tx_map_df$map_f,
parallel = TRUE)
# spp_maps_df <- spp_maps_raw %>%
# calc_spp_cell_fv(spp_fe)
}
### grab vulnerability scores for this stressor/taxon combo and process maps
spp_str_info_df <- tx_spp %>%
filter(vulnerability == v)
str_vuln_map_df <- mc_process_vuln(spp_maps_df, spp_str_info_df)
message('Creating and saving unweighted mean vuln rasters for ',
v, ' on ', t, '...')
rast_mean_unwt <- map_to_mol(str_vuln_map_df, which = 'vuln_mean_unwt')
rast_sd_unwt <- map_to_mol(str_vuln_map_df, which = 'vuln_sd_unwt')
rast_nspp <- map_to_mol(str_vuln_map_df, which = 'n_spp')
writeRaster(rast_mean_unwt, outfiles['mean'], overwrite = TRUE)
writeRaster(rast_sd_unwt, outfiles['sdev'], overwrite = TRUE)
writeRaster(rast_nspp, outfiles['nspp'], overwrite = TRUE)
}
}For each vuln, pull in all taxon rasters, assemble into dataframe, and summarize to aggregate mean, sd, and nspp. This will require a pooled variance approach to backing out the standard deviation.
Pooled variance formula when variances not necessarily equal (from one of the responses here)
\[s^2_{x_1 \cup x_2} = \frac{(n_1-1)s^2_{x_1} + (n_2-1)s^2_{x_2}}{(n_1+n_2-1)} + \frac{n_1 n_2(\bar x_1 - \bar x_2)^2}{(n_1+n_2)(n_1+n_2-1)}\]
But how does this formula work for multiple groups? Seems like the correction factor gets increasingly complicated, but a sequential calculation, each time taking the result from one combo and combining it with a new set, should work. Here I define a function for a two-sample pooled variance, and then an iterated version that sequentially pools elements into the larger pool.
combine_taxa_maps <- function(str_tx_v_df) {
vuln <- str_tx_v_df$v %>% unique()
mean_fs <- str_tx_v_df %>% filter(p == 'mean')
sdev_fs <- str_tx_v_df %>% filter(p == 'sdev')
nspp_fs <- str_tx_v_df %>% filter(p == 'nspp')
message('... loading mean maps across all taxa for vuln ', vuln, '...')
mean_df <- parallel::mclapply(mean_fs$f, mc.cores = 24, FUN = r_to_df) %>%
setNames(mean_fs$t) %>%
data.table::rbindlist(idcol = 'taxon') %>%
rename(v_mean = val)
message('... loading std dev maps across all taxa for vuln ', vuln, '...')
sdev_df <- parallel::mclapply(sdev_fs$f, mc.cores = 24, FUN = r_to_df) %>%
setNames(sdev_fs$t) %>%
data.table::rbindlist(idcol = 'taxon') %>%
rename(v_sdev = val)
message('... loading nspp maps across all taxa for vuln ', vuln, '...')
nspp_df <- parallel::mclapply(nspp_fs$f, mc.cores = 24, FUN = r_to_df) %>%
setNames(nspp_fs$t) %>%
data.table::rbindlist(idcol = 'taxon') %>%
rename(v_nspp = val)
message('... joining mean, sd, nspp into big-ass dataframe for vuln ', vuln, '...')
big_df <- mean_df %>%
oharac::dt_join(sdev_df, by = c('taxon', 'cell_id'), type = 'full') %>%
oharac::dt_join(nspp_df, by = c('taxon', 'cell_id'), type = 'full')
return(big_df)
}
process_mean_rasts <- function(big_df) {
### Set up for parallel processing
cell_id_vec <- big_df$cell_id %>% unique()
n_gps <- 25
gp_vec <- rep(1:n_gps, length.out = length(cell_id_vec))
### perform parallel processing
spp_mean_list <- parallel::mclapply(
X = 1:n_gps, mc.cores = 25,
FUN = function(gp) { ### gp <- 1
gp_cells <- cell_id_vec[gp_vec == gp]
message('...processing ', length(gp_cells), ' cells in group ', gp, '...')
gp_out <- big_df %>%
filter(cell_id %in% gp_cells) %>%
data.table() %>%
.[, .(vuln_mean = sum(v_mean * v_nspp) / sum(v_nspp)),
by = .(cell_id)]
})
### gather results
spp_mean_df <- data.table::rbindlist(spp_mean_list)
return(spp_mean_df)
}
process_sdev_rasts <- function(big_df) {
### Set up for parallel processing
cell_id_vec <- big_df$cell_id %>% unique()
n_gps <- 40
gp_vec <- rep(1:n_gps, length.out = length(cell_id_vec))
### perform parallel processing
spp_sdev_list <- parallel::mclapply(
X = 1:n_gps, mc.cores = 20,
FUN = function(gp) { ### gp <- round(n_gps / 2)
gp_cells <- cell_id_vec[gp_vec == gp]
message('...processing ', length(gp_cells), ' cells in group ', gp,
' of ', n_gps, '...')
gp_sdev_out <- big_df %>%
filter(cell_id %in% gp_cells) %>%
data.table() %>%
.[, .(vuln_sdev = iterated_pooled_var(v_mean, v_sdev, v_nspp) %>% sqrt()),
by = .(cell_id)]
return(gp_sdev_out)
})
### gather results
spp_sdev <- data.table::rbindlist(spp_sdev_list)
return(spp_sdev)
}
r_to_df <- function(f) {
r <- terra::rast(f)
df <- data.frame(values(r),
cell_id = 1:ncell(r)) %>%
rename(val := !!names(r)) %>%
filter(!is.na(val))
return(df)
}taxon_map_dir <- here_anx('vuln_maps_by_taxon', 'vuln_by_species')
tx_v_map_df <- data.frame(f = list.files(taxon_map_dir, full.names = TRUE,
pattern = 'vuln_.+.tif')) %>%
mutate(t = str_extract(basename(f), paste0(taxa_vec, collapse = '|')),
v = str_extract(basename(f), paste0(vuln_vec, collapse = '|')),
p = str_extract(basename(f), '_mean|_sdev|_nspp') %>% str_remove('_'))
out_stem <- here_anx('_output/vuln_maps/vuln_maps_by_species/vuln_spp_%s_%s.tif')
### format will be: vuln, parameter (mean, sd, nspp)
for(vuln in vuln_vec) {
### vuln <- vuln_vec[23]
### check if total vuln maps are complete
outf_mean <- sprintf(out_stem, vuln, 'mean')
outf_sdev <- sprintf(out_stem, vuln, 'sdev')
outf_nspp <- sprintf(out_stem, vuln, 'nspp')
if(all(file.exists(outf_mean, outf_sdev))) {
message('All summary rasters exist for vuln ', vuln, '... skipping!')
next()
}
### Combine mean, sdev, and nspp maps by taxon into one big dataframe
message('Processing mean, sd, nspp maps across all species for ', vuln, '...')
str_tx_v_df <- tx_v_map_df %>%
filter(v == vuln)
# asdf <- str_tx_v_df %>% filter(p == 'mean') %>% pull(f)
# zxcv <- rast(asdf[6])
# plot(zxcv)
big_df <- combine_taxa_maps(str_tx_v_df)
### Process mean raster across taxa
if(!file.exists(outf_mean)) {
message('... summarizing mean vulnerability map across all taxa...')
spp_mean <- process_mean_rasts(big_df)
rast_mean <- map_to_mol(spp_mean, which = 'vuln_mean')
writeRaster(rast_mean, outf_mean, overwrite = TRUE)
} else {
message('... mean vulnerability map exists for vuln ', vuln, '... skipping!')
}
### Process standard deviation raster across taxa using pooled var
if(!file.exists(outf_sdev)) {
message('... summarizing standard deviation vulnerability map across all taxa...')
spp_sdev <- process_sdev_rasts(big_df)
rast_sdev <- map_to_mol(spp_sdev, which = 'vuln_sdev')
writeRaster(rast_sdev, outf_sdev, overwrite = TRUE)
} else {
message('... standard deviation map exists for vuln ', vuln, '... skipping!')
}
}These should all be the same. If so, copy one to the main output and delete the rest as redundant.
nspp_main_out_f <- here('_output/nspp_maps/species_richness.tif')
nspp_vuln_fs <- list.files(dirname(out_stem), pattern = 'nspp.tif', full.names = TRUE)
if(!file.exists(nspp_main_out_f)) {
### create the main spp richness raster from vuln-level richness rasters,
### which *should* all be identical.
if(length(nspp_vuln_fs) < length(vuln_vec)) {
stop('Species richness raster ', basename(nspp_main_out_f), ' is missing, but ',
'not all vuln-level species richness maps are available!')
}
nspp_list <- lapply(nspp_vuln_fs, raster::raster)
### if not all equal, this throws an error - stop and figure out why
check_nspp <- raster::compareRaster(nspp_list, values = TRUE, stopiffalse = TRUE)
### all are equal, so copy one to nspp_main_out_f and delete the vuln-level ones
writeRaster(nspp_list[[1]], nspp_main_out_f, overwrite = TRUE)
unlink(nspp_vuln_fs)
} else if(length(nspp_vuln_fs) > 0) {
### verify that the main out file is the same as any vuln-level ones
nspp_main <- raster::raster(nspp_main_out_f)
nspp_list <- lapply(nspp_vuln_fs, raster::raster)
### if not all equal, this throws an error - stop and figure out why
check_nspp <- raster::compareRaster(nspp_main, nspp_list, values = TRUE, stopiffalse = TRUE)
### if all good, delete the vuln-level maps as redundant
unlink(nspp_vuln_fs)
}vuln_spp_dir <- here_anx('_output/vuln_maps/vuln_maps_by_species')
mean_fs <- list.files(vuln_spp_dir, pattern = '_mean.tif', full.names = TRUE)
# sdev_fs <- list.files(vuln_spp_dir, pattern = '_sdev.tif', full.names = TRUE)
# focal_strs <- c('biomass_removal', 'bycatch',
# 'microplastic', 'wildlife_strike',
# 'nutrient_pollution', 'ocean_acidification',
# 'sst_rise', 'marine_heat_wave',
# 'light_pollution')
focal_strs <- basename(mean_fs) %>%
str_remove_all('vuln_spp_|_mean.tif')
for(f in focal_strs) { ### f <- focal_strs[15]
mean_f <- mean_fs[str_detect(basename(mean_fs), f)]
# sdev_f <- sdev_fs[str_detect(basename(sdev_fs), f)]
mean_rast <- terra::rast(mean_f)
# sdev_rast <- terra::rast(sdev_f)
map_cols <- hcl.colors(n = 50)
plot(mean_rast, zlim = c(0, 1), col = map_cols, main = paste0('Mean vuln: ', f),
legend = FALSE, axes = FALSE)
}